This week's seminar is concerned about Reinforcement Learning. We are going to display standard Q-Learning algorithms as well as Deep Q-Learning using Open AI Gym.
Further we will explore some classical tasks from the book Reinforcement Learning - Barto and Sutton.
from IPython.display import HTML, display
import warnings
warnings.filterwarnings("ignore")
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import rc
import gym
np.random.seed(10)
bandit_probabilities = [0.10, 0.40, 0.10, 0.10]
number_of_bandits = len(bandit_probabilities)
action_space = np.arange(number_of_bandits) # =[0, 1, 2, 3]
number_of_trials = 20
timesteps = 1000
arms = np.zeros(timesteps, dtype=int)
def step(action):
rand = np.random.random() # [0.0,1.0)
reward = 1.0 if (rand < bandit_probabilities[action]) else 0.0
return reward
def cumulative_average_mean(r, n):
return np.cumsum(r) / np.cumsum(np.ones(n))
def compute_regret(bandit_probabilities, arms, time_steps):
probs = [bandit_probabilities[arm] for arm in arms]
optimal_value = np.cumsum(np.ones(time_steps)) * np.amax(bandit_probabilities)
policy_value = np.cumsum(probs)
return optimal_value - policy_value
def epsilon_greedy_policy(epsilon, actions, q_values):
if np.random.binomial(1, epsilon) == 1:
return np.random.choice(actions)
else:
return np.random.choice(
[
action_
for action_, value_ in enumerate(q_values)
if value_ == np.max(q_values)
]
)
def apply_epsilon_greedy(n_bandits, action_space, n_trials, timesteps, epsilons, arms):
rewards = np.zeros((len(epsilons), n_trials, timesteps), dtype=float)
regrets = np.zeros((len(epsilons), n_trials, timesteps), dtype=float)
for eps_counter, eps in enumerate(epsilons):
for trial in range(n_trials):
n = np.zeros(n_bandits, dtype=int)
q = np.zeros(n_bandits, dtype=float)
for t in range(timesteps):
"""
This is where the magic happens:
1. Select an action using the current guess and an eps-policy
2. Execute that action
3. Update your beliefs
Extra: Optimism and Upper Confidence bounds
"""
action = epsilon_greedy_policy(eps, action_space, q)
r = step(action)
# updating action counter and Q
q[action] = q[action] + (r - q[action]) / (
n[action] + 1.0
) # update value in a online manner
n[action] += 1
rewards[eps_counter, trial, t] += r
arms[t] = action
regret = compute_regret(bandit_probabilities, arms, timesteps)
regrets[eps_counter, trial, :] += regret
rewards = np.mean(rewards, axis=1)
regrets = np.mean(regrets, axis=1)
rewards = list(rewards)
regrets = list(regrets)
legend_entries = [
"$\epsilon$-greedy $\epsilon=" + str(eps) + "$" for eps in epsilons
]
rewards = list(zip(rewards, legend_entries))
regrets = list(zip(regrets, legend_entries))
return rewards, regrets
epsilons = [0.0, 0.01, 0.1, 0.9]
rewards_epsilon_greedy, regrets_epsilon_greedy = apply_epsilon_greedy(
number_of_bandits, action_space, number_of_trials, timesteps, epsilons, arms
)
def plot_rewards(reward_list, n):
for r in reward_list:
plt.plot(cumulative_average_mean(r[0], n), linewidth=3)
plt.xlabel("Steps")
plt.ylabel("Average reward")
legend_str = [r[1] for r in reward_list]
plt.legend(legend_str)
plt.show()
def plot_regrets(regrets):
for a in regrets:
plt.plot(a[0], linewidth=3)
plt.xlabel("Steps")
plt.ylim(0.0, 50.0)
plt.ylabel("Regret")
legend_str = [r[1] for r in regrets]
plt.legend(legend_str)
plt.show()
plot_rewards(rewards_epsilon_greedy, timesteps)
plot_regrets(regrets_epsilon_greedy)
In Monte Carlo methods, the value function of a state is estimated by the average of the returns following each visit or each first visit to that state in a set of episodes.
Incremental mean: $$\mu_k=\frac{1}{k}\sum_{j=1}^kx_j=\mu_{k-1}+\frac{1}{k}(x_k-\mu_{k-1})$$
Incremental MC updates: $$v^j(s_t)=v^{j-1}(s_t)+\frac{1}{N(s_t)}(G^j_t-v^{j-1}(s_t))$$
where $G^j_t$ is the total return at $j$-th episode and $N(s_t)$ is the number of times state $s_t$ was visited - the every-visit MC method.
def mc_policy_evaluation(state_count, r, value):
return value + (r - value) / state_count
np.random.seed(123)
env = gym.make("Blackjack-v1", natural=False, sab=False)
The observation consists of a 3-tuple containing:
Actions:
Rewards:
We would like to evaluate the following policy:
def policy(hand_sum):
if hand_sum > 18:
return 0
else:
return 1
values_usable_ace = np.zeros((10, 10))
values_no_usable_ace = np.zeros_like(values_usable_ace)
state_count_ace = np.zeros_like(values_usable_ace)
state_count_no_ace = np.zeros_like(state_count_ace)
episodes = 100000
for e in range(episodes):
done = False
obs = env.reset(seed=e)
state_history = []
g = []
obs = obs[0]
if obs[0] < 11:
done = True
state_history.append(obs)
"""
We first play a game from beginning to end, following the policy that we need to evaluate
"""
while not done:
a = policy(obs[0])
obs, r, done, info, prob = env.step(a)
g.append(r)
if done:
break
state_history.append(obs)
final_reward = sum(g)
"""
We then combine the information of a whole episode together,
to get an estimate of the value of being in a certain state
and following the given policy.
Remark: this is an UNBIASED estimator
"""
for player_idx, dealer_idx, ace in state_history:
player_idx -= 12
dealer_idx -= 1
if ace:
state_count_ace[player_idx, dealer_idx] += 1.0
values_usable_ace[player_idx, dealer_idx] = mc_policy_evaluation(
state_count_ace[player_idx, dealer_idx],
final_reward,
values_usable_ace[player_idx, dealer_idx],
)
else:
state_count_no_ace[player_idx, dealer_idx] += 1.0
values_no_usable_ace[player_idx, dealer_idx] = mc_policy_evaluation(
state_count_no_ace[player_idx, dealer_idx],
final_reward,
values_no_usable_ace[player_idx, dealer_idx],
)
from mpl_toolkits.mplot3d import Axes3D
def plot_v(values, ace=True):
"""
Visualize the value function
"""
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(projection="3d")
ax.view_init(elev=15, azim=0, roll=0)
x = np.arange(12, 22)
y = np.arange(1, 11)
X, Y = np.meshgrid(y, x)
Z = values.reshape(X.shape)
ax.plot_surface(X, Y, Z, cmap=plt.cm.coolwarm, linewidth=1, rstride=1, cstride=1)
if ace:
ax.set_title("With Ace")
else:
ax.set_title("Without Ace")
ax.set_xlabel("Dealer Showing")
ax.set_ylabel("Player Hand")
ax.set_zlabel("State Value")
plt.show()
plot_v(values_usable_ace)
plot_v(values_no_usable_ace, ace=False)
Next, we will practice solving reinforcement problems by using function approximation. Specifically, we will consider the following exercise:
import sklearn.pipeline
from sklearn.kernel_approximation import RBFSampler
from sklearn.linear_model import LinearRegression
Before starting looking into our exercises, we provie a summary of some basic concepts.
On-policy methods aim to estimate the value of the policy that is used for control. Examples of on-policy methods are:
The updates of action estimates according to SARSA are of the following form:
$$ Q(s_t,a_t){\leftarrow}Q(s_t,a_t)+\alpha[r_{t+1}+{\gamma}Q(s_{t+1},a_{t+1})-Q(s_t,a_t)] $$An on-policy method updates action value estimates using the values of next state-action pairs where the next actions are generated by the current policy.
In off-policy methods, the behaviour policy that generates actions may be unrelated to the action that is being used in the target. Examples of off-policy methods include:
The updates of action estimates according to Q-learning are of the following form:
$$ Q(s_t,a_t){\leftarrow}Q(s_t,a_t)+\alpha[r_{t+1}+{\gamma}\max_{a}Q(s_{t+1},a)-Q(s_t,a_t)] $$Q-learning updates action value estimates using the values of next state-action pairs where the next actions are generated by a greedy policy. That is to say, the action-state values are estimated under the assumption of a greedy policy despite the fact that it is not following a greedy policy.
The Mountain Car problem is described in Example 10.1 (2nd Edition) in Sutton & Barto. The basic idea is driving the car to reach the goal position by building up momentum. We will use the existing environment in OpenAI Gym (ID: MountainCar-v0).
As the RBF kernel assumes all features are centred around 0 and have variance in the same order, we will normalize the data as the first step. For the task of normalization and feature engineering, we will use the scikit-learn library.
As this method falls into the category of online learning, we will define a normalizer and feature extractor using some pre-training data as follows:
We randomly sample 10000 observations as pre-training data for normalization.
openai_env = "MountainCar-v0"
env = gym.make(openai_env, render_mode="rgb_array")
n_actions = env.action_space.n
actions = np.arange(n_actions)
sample_size = 10000
# Note: Alternatively create samples via env.observation_space.sample()
obs_samples = []
num_samples = 0
while num_samples < sample_size:
env.reset()
done = False
while not done:
(
observation,
_,
done,
_,
_,
) = env.step(env.action_space.sample())
obs_samples.append(observation)
num_samples += 1
# obs_samples = np.array([env.observation_space.sample() for i in range(sample_size)])
# print(obs_samples)
We approximate the action-value function by a linear combination of features, i.e.,
$$\hat{Q}(s,a,\mathbf{w})=\phi(s,a)^{\top}\mathbf{w}$$The objective function is the MSE between the true action value $Q(s,a)$ and $\hat{Q}(s,a,\mathbf{w})$:
$$ \mathcal{L(\mathbf{w})}=\frac{1}{2}\mathbb{E}_{s,a,r,s'}[(Q(s,a)-\hat{Q}(s,a,\mathbf{w}))^2] $$Differentiating the objective w.r.t. the parameter $\mathbf{w}$, we obtain the parameter update
$$ \mathbf{w}_{t+1}=\mathbf{w}_t+{\eta}(Q(s_t,a_t)-\hat{Q}(s_t,a_t,\mathbf{w}_t))\phi(s_t,a_t) $$Unfortunately, we don't have access to the true $Q(s,a)$ in practice. We therefore substitute a target for it.
We can substituting with a SARSA target
$$Q(s_t,a_t)=r_{t}+\gamma\hat{Q}(s_{t+1},a_{t+1},\mathbf{w}_t)$$or a Q-learning target,
$$ Q(s_t,a_t)=r_{t}+\gamma\max_{a'}\hat{Q}(s_{t+1},a',\mathbf{w}_t) $$In the following implementation, we choose the SARSA target as a substitution to the true state-action values. The update for parameter $\mathbf{w}$ is therefore
$$ \mathbf{w}_{t+1}=\mathbf{w}_t+{\eta}(r_{t}+\gamma\hat{Q}(s_{t+1},a_{t+1},\mathbf{w}_t)-\hat{Q}(s_t,a_t,\mathbf{w}_t))\phi(s_t,a_t) $$class Approximator:
def __init__(self, obs, dim, n_a, alpha, discount):
self.__discount = discount
self.__alpha = alpha
self.__n_actions = n_a
self.__obs_samples = obs
self.__feature_dim = dim
self.__w_size = 4 * self.__feature_dim
self.__w = np.zeros((self.__n_actions, self.__w_size))
self.__scaler = None
self.__featuriser = None
self.__initialised = False
def initialise_scaler_featuriser(self):
self.__scaler = sklearn.preprocessing.StandardScaler().fit(self.__obs_samples)
self.__featuriser = sklearn.pipeline.FeatureUnion(
[
("rbf1", RBFSampler(gamma=5.0, n_components=self.__feature_dim)),
("rbf2", RBFSampler(gamma=2.0, n_components=self.__feature_dim)),
("rbf3", RBFSampler(gamma=1.0, n_components=self.__feature_dim)),
("rbf4", RBFSampler(gamma=0.5, n_components=self.__feature_dim)),
]
)
self.__featuriser.fit(self.__scaler.transform(self.__obs_samples))
self.__initialised = True
@property
def get_w(self):
return self.__w
def feature_transformation(self, state):
if not self.__initialised:
self.initialise_scaler_featuriser()
scaled = self.__scaler.transform([state])
features = self.__featuriser.transform(scaled)
return features
# linear_features
def action_value_estimator(self, features, a):
return np.inner(features, self.__w[a])
# minimizing MSE between q(replaced by td target) and q_hat
def update_w(self, r, q, next_q, features, a):
target = r + self.__discount * next_q
td_error = target - q
w_gradient = self.__alpha * td_error * features
self.__w[a] = self.__w[a] + w_gradient
def set_w(self, a, new_w):
self.__w[a] = new_w
def cost_to_go(self, state):
features = self.feature_transformation(state)
v_s = []
for i in range(self.__n_actions):
v_s.append(self.action_value_estimator(features, i))
return -np.max(v_s)
def plot_v(estimator, i=None):
if openai_env == "MountainCar-v0":
fig = plt.figure(figsize=(12, 8))
ax = fig.add_subplot(projection="3d")
x = np.linspace(-1.2, 0.6, 30)
y = np.linspace(-0.07, 0.07, 30)
X, Y = np.meshgrid(x, y)
states = np.dstack((X, Y))
values = np.apply_along_axis(estimator.cost_to_go, 2, states)
ax.plot_surface(
X, Y, values, cmap=plt.cm.coolwarm, linewidth=1, rstride=1, cstride=1
)
extra_title = ""
if i:
extra_title = " after {} iterations".format(i)
ax.set_title("Cost to go function" + extra_title)
ax.set_xlabel("Position")
ax.set_ylabel("Velocity")
ax.set_zlabel("Value")
plt.show()
Based on the randomly-sampled 10000 observations, create an instance of the state-action function approximator and initialize the normaliser and feature extractor:
alpha = 0.1
gamma = 1.0
epsilon = 0.1
dim = 100
# create an instance of approximator and initialize the normalizer and feature extractor
estimator = Approximator(obs_samples, dim, n_actions, alpha, gamma)
estimator.initialise_scaler_featuriser()
Play the game and do online updates for parameter $\mathbf{w}$,
def epsilon_greedy_policy(epsilon, actions, values):
if np.random.binomial(1, epsilon) == 1:
return np.random.choice(actions)
else:
return np.random.choice(
[
action_
for action_, value_ in enumerate(values)
if value_ == np.max(values)
]
)
episodes = 50
for i in range(1, episodes + 1):
state = env.reset()[0]
a = env.action_space.sample()
step_count = 0
while True:
step_count += 1
"""
At each step, we first perform our action a (choosen in an
epsilon-greedy way). This is possible because we can interrogate
our blackbox as many time as we want to get a guess about the
value of each action.
Choosing a was done at the previous step.
"""
(
next_state,
r,
done,
_,
_,
) = env.step(a)
if done:
break
"""
We interrogate our blackbox to have a guess about the
value of Q(s,a) for our eps-greedy action a.
The feature thing is due to the fact that our state space
is too big, so we use two neural networks instead of one,
but one is just used to represent our states.
"""
features = estimator.feature_transformation(state)
q_sa = estimator.action_value_estimator(features, a)
next_feature = estimator.feature_transformation(next_state)
"""
We need a guess about what the optimal policy would be,
I.e. we need to be able to have an epsilon-guessing-optimal
strategy.
This bit is indeed very important. We essentially ask to
the black-box for the list of guesses of values of each action.
This allow us to do something similar to the value iteration.
"""
q_values = []
for j in actions:
q_values.append(estimator.action_value_estimator(next_feature, j))
"""
We now select the action a greedily with respect to our guesses.
"""
next_a = epsilon_greedy_policy(epsilon, actions, q_values)
next_q_sa = estimator.action_value_estimator(next_feature, next_a)
"""
Here we update the network. This is done using on one side the actual
return, on the other side our guesses.
"""
estimator.update_w(r, q_sa, next_q_sa, features, a)
a = next_a
state = next_state
if i % 10 == 0:
plot_v(estimator, i)
plot_v(estimator)
import matplotlib.animation
import matplotlib
from IPython.display import HTML
rewards = 0
env.reset()
frames = []
action = env.action_space.sample()
while True:
(
s,
r,
done,
_,
_,
) = env.step(action)
frames.append(env.render())
rewards = rewards + r
if done:
break
feature = estimator.feature_transformation(s)
q_values = []
for j in actions:
q_values.append(estimator.action_value_estimator(feature, j))
action = epsilon_greedy_policy(0, actions, q_values)
print(rewards)
plt.figure(figsize=(frames[0].shape[1] / 72.0, frames[0].shape[0] / 72.0), dpi=72)
patch = plt.imshow(frames[0])
plt.axis("off")
animate = lambda i: patch.set_data(frames[i])
ani = matplotlib.animation.FuncAnimation(
plt.gcf(), animate, frames=len(frames), interval=50
)
HTML(ani.to_jshtml())
-131.0
Actor-Critic methods are policy gradient methods that represent the policy function independent of the value function.
A policy function (or policy) returns a probability distribution over actions that the agent can take based on the given state. A value function determines the expected return for an agent starting at a given state and acting according to a particular policy forever after.
In the Actor-Critic method, the policy is referred to as the actor that proposes a set of possible actions given a state, and the estimated value function is referred to as the critic, which evaluates actions taken by the actor based on the given policy.
In this tutorial, both the Actor and Critic will be represented using neural networks.
CartPole-v1
In the CartPole-v1 environment, a pole is attached to a cart moving along a frictionless track. The pole starts upright and the goal of the agent is to prevent it from falling over by applying a force of -1 or +1 to the cart. A reward of +1 is given for every time step the pole remains upright. An episode ends when (1) the pole is more than 15 degrees from vertical or (2) the cart moves more than 2.4 units from the center.
The problem is considered "solved" when the average total reward for the episode reaches 495 over 100 consecutive trials.
The code below is from this repository
Import necessary packages and configure global settings.
import gym, os
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Categorical
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make(
"CartPole-v1",
).unwrapped
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
lr = 0.0001
The Actor and Critic will be modeled using neural networks that generates the action probabilities and critic value respectively.
During the forward pass, the model will take in the state as the input and will output both action probabilities and critic value $V$, which models the state-dependent value function. The goal is to train a model that chooses actions based on a policy $\pi$ that maximizes expected return.
For Cartpole-v1, there are four values representing the state: cart position, cart-velocity, pole angle and pole velocity respectively. The agent can take two actions to push the cart left (0) and right (1) respectively.
torch.manual_seed(2024510)
class Actor(nn.Module):
def __init__(self, state_size, action_size):
super(Actor, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.linear1 = nn.Linear(self.state_size, 128)
self.linear2 = nn.Linear(128, 256)
self.linear3 = nn.Linear(256, self.action_size)
def forward(self, state):
output = F.relu(self.linear1(state))
output = F.relu(self.linear2(output))
output = self.linear3(output)
distribution = Categorical(F.softmax(output, dim=-1))
return distribution
class Critic(nn.Module):
def __init__(self, state_size, action_size):
super(Critic, self).__init__()
self.state_size = state_size
self.action_size = action_size
self.linear1 = nn.Linear(self.state_size, 128)
self.linear2 = nn.Linear(128, 256)
self.linear3 = nn.Linear(256, 1)
def forward(self, state):
output = F.relu(self.linear1(state))
output = F.relu(self.linear2(output))
value = self.linear3(output)
return value
The sequence of rewards for each timestep $t$, $\{r_{t}\}^{T}_{t=1}$ collected during one episode is converted into a sequence of expected returns $\{G_{t}\}^{T}_{t=1}$ in which the sum of rewards is taken from the current timestep $t$ to $T$ and each reward is multiplied with an exponentially decaying discount factor $\gamma$:
$$G_{t} = \sum^{T}_{t'=t} \gamma^{t'-t}r_{t'}$$Since $\gamma\in(0,1)$, rewards further out from the current timestep are given less weight.
Intuitively, expected return simply implies that rewards now are better than rewards later. In a mathematical sense, it is to ensure that the sum of the rewards converges.
def compute_returns(next_value, rewards, masks, gamma=0.99):
R = next_value
returns = []
for step in reversed(range(len(rewards))):
R = rewards[step] + gamma * R * masks[step]
returns.insert(0, R)
return returns
The actor loss is based on policy gradients with the critic as a state dependent baseline and computed with single-sample (per-episode) estimates.
$$L_{actor} = -\sum^{T}_{t=1} \log\pi_{\theta}(a_{t} | s_{t})[G(s_{t}, a_{t}) - V^{\pi}_{\theta}(s_{t})]$$where:
A negative term is added to the sum since the idea is to maximize the probabilities of actions yielding higher rewards by minimizing the combined loss.
The $G - V$ term in our $L_{actor}$ formulation is called the advantage, which indicates how much better an action is given a particular state over a random action selected according to the policy $\pi$ for that state.
While it's possible to exclude a baseline, this may result in high variance during training. And the nice thing about choosing the critic $V$ as a baseline is that it trained to be as close as possible to $G$, leading to a lower variance.
In addition, without the critic, the algorithm would try to increase probabilities for actions taken on a particular state based on expected return, which may not make much of a difference if the relative probabilities between actions remain the same.
For instance, suppose that two actions for a given state would yield the same expected return. Without the critic, the algorithm would try to raise the probability of these actions based on the objective $J$. With the critic, it may turn out that there's no advantage ($G - V = 0$) and thus no benefit gained in increasing the actions' probabilities and the algorithm would set the gradients to zero.
Training $V$ to be as close possible to $G$ can be set up as a regression problem with the following loss function:
$$L_{critic} = L_{\delta}(G, V^{\pi}_{\theta})$$where $L_{\delta}$ is the squared-error loss.
To train the agent, you will follow these steps:
Note that this method differs algorithmically from the lecture and also from the Sutton-Barto Book. The weights are updated online after each step, while the implementation in this Notebook only updates parameters in an offline manner at the end of an episode.
def trainIters(actor, critic, n_iters):
optimizerA = optim.Adam(actor.parameters())
optimizerC = optim.Adam(critic.parameters())
for iter in range(n_iters):
state = env.reset(seed=iter)
state = state[0]
log_probs = []
values = []
rewards = []
masks = []
for i in count():
state = torch.FloatTensor(state).to(device)
dist, value = actor(state), critic(state)
action = dist.sample()
next_state, reward, done, _, _ = env.step(action.cpu().numpy())
log_prob = dist.log_prob(action).unsqueeze(0)
log_probs.append(log_prob)
values.append(value)
rewards.append(torch.tensor([reward], dtype=torch.float, device=device))
masks.append(torch.tensor([1 - done], dtype=torch.float, device=device))
state = next_state
if done:
if iter % 10 == 0:
print("Iteration: {}, Score: {}".format(iter, i))
break
next_state = torch.FloatTensor(next_state).to(device)
next_value = critic(next_state)
returns = compute_returns(next_value, rewards, masks)
log_probs = torch.cat(log_probs)
returns = torch.cat(returns).detach()
values = torch.cat(values)
advantage = returns - values
actor_loss = -(log_probs * advantage.detach()).mean()
critic_loss = advantage.pow(2).mean()
optimizerA.zero_grad()
optimizerC.zero_grad()
actor_loss.backward()
critic_loss.backward()
optimizerA.step()
optimizerC.step()
env.close()
import warnings
warnings.filterwarnings("ignore")
actor = Actor(state_size, action_size).to(device)
critic = Critic(state_size, action_size).to(device)
trainIters(actor, critic, n_iters=200)
Iteration: 0, Score: 11 Iteration: 10, Score: 19 Iteration: 20, Score: 12 Iteration: 30, Score: 11 Iteration: 40, Score: 24 Iteration: 50, Score: 22 Iteration: 60, Score: 21 Iteration: 70, Score: 10 Iteration: 80, Score: 42 Iteration: 90, Score: 27 Iteration: 100, Score: 27 Iteration: 110, Score: 93 Iteration: 120, Score: 99 Iteration: 130, Score: 145 Iteration: 140, Score: 615 Iteration: 150, Score: 427 Iteration: 160, Score: 293 Iteration: 170, Score: 711 Iteration: 180, Score: 405 Iteration: 190, Score: 578
The content of this notebook accompanies Chapter 8 in the Sutton & Barto book.

